package org.zjvis.datascience.service;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import org.apache.commons.codec.Charsets;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.math3.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.zjvis.datascience.common.constant.NoticeConstant;
import org.zjvis.datascience.common.dto.*;
import org.zjvis.datascience.common.enums.TaskTypeEnum;
import org.zjvis.datascience.common.widget.dto.WidgetDTO;
import org.zjvis.datascience.common.widget.enums.WidgetTypeEnum;
import org.zjvis.datascience.common.exception.BaseErrorCode;
import org.zjvis.datascience.common.exception.DataScienceException;
import org.zjvis.datascience.common.util.JwtUtil;
import org.zjvis.datascience.common.util.MapUtil;
import org.zjvis.datascience.common.util.TaskUtil;
import org.zjvis.datascience.common.vo.DashboardVO;
import org.zjvis.datascience.common.widget.enums.WidgetConfigEnum;
import org.zjvis.datascience.service.cleanup.ExpandBitSet;
import org.zjvis.datascience.service.dag.DAGScheduler;
import org.zjvis.datascience.service.mapper.DashboardMapper;
import org.zjvis.datascience.service.mapper.PipelineMapper;
import org.zjvis.datascience.service.mapper.TaskInstanceMapper;
import org.zjvis.datascience.service.mapper.TaskMapper;
import org.zjvis.datascience.service.socket.SocketIOService;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.zjvis.datascience.common.widget.WidgetJsonConstant.*;

/**
 * @description Dashboard画布 Service
 * @date 2021-11-19
 */
@Service
public class DashboardService {

    private final static Logger logger = LoggerFactory.getLogger("DashboardService");

    @Autowired
    private DashboardMapper dashboardMapper;

    @Autowired
    private PipelineMapper pipelineMapper;

    @Autowired
    private TaskMapper taskMapper;

    @Autowired
    private TaskInstanceMapper taskInstanceMapper;

    @Autowired
    private TaskUtil taskUtil;

    @Lazy
    @Autowired
    private TaskService taskService;

    @Autowired
    private WidgetService widgetService;

    @Lazy
    @Autowired
    private DAGScheduler dagScheduler;

    @Autowired
    private SocketIOService socketIOService;

    private static final String NEW_PIPELINE_ID = "newPipelineId";

    private static final String RELATIONSHIP_MAP = "relationship";

    public Long save(DashboardDTO dashboard) {
        dashboardMapper.save(dashboard);
        return dashboard.getId();
    }

    public void update(DashboardDTO dashboard) {
        dashboardMapper.update(dashboard);
    }

    public void delete(Long id) {
        Map<String, Object> params = Maps.newHashMap();
        params.put("ids", Lists.newArrayList(id));
        dashboardMapper.delete(params);
    }

    /**
     * 为新项目复制dashboard
     *
     * @param oldId
     */
    @Transactional(rollbackFor = Exception.class)
    public void copy(Long oldId, Long newProjectId, Map<Long, Long> relationMap) throws DataScienceException {
        //1. 取出对应dashboard信息
        DashboardDTO dashboardTemplate = dashboardMapper.queryById(oldId);
        Long oldProjectId = dashboardTemplate.getProjectId();
        dashboardTemplate.setId(null);
        dashboardTemplate.setPublishNo(null);
        dashboardTemplate.setGmtCreate(null);
        dashboardTemplate.setGmtModify(null);
        dashboardTemplate.setPublishTime(null);
        dashboardTemplate.setPublishLayout(null);
        dashboardTemplate.setProjectId(newProjectId);
        dashboardTemplate.setGmtCreator(JwtUtil.getCurrentUserId());
        dashboardTemplate.setGmtModifier(JwtUtil.getCurrentUserId());
        dashboardTemplate.setStatus(0);//未发布

        //2. 提取 在画板中 需要复制的widgetId 然后批量复制
        List<Long> need2processIds = new ArrayList<>();
        JSONObject oldLayout = JSONObject.parseObject(dashboardTemplate.getLayout());
        List<JSONObject> gridItems = oldLayout.getJSONArray("gridItems")
                .toJavaList(JSONObject.class);
        if (null != gridItems) {
            gridItems.stream().forEach(item -> {
                if (item.containsKey("widgetId")) {
                    need2processIds.add(item.getLong("widgetId"));
                }
            });
            List<Long> updatedIds = widgetService.batchInsert(need2processIds, relationMap);
            for (int i = 0; i < updatedIds.size(); i++) {
                gridItems.get(i).put("widgetId", updatedIds.get(i));
            }
            oldLayout.put("gridItems", gridItems);
            dashboardTemplate.setLayout(oldLayout.toJSONString());
        }

        //3. 复制在 “我的组件”中的需要被复制的widget (在这里的 不一定在画板上)
        List<Long> oldTaskIds = taskMapper.queryIdByProjectId(oldProjectId);
        oldTaskIds.stream().forEach(taskId -> {
            List<Long> collect = widgetService.queryTemplateByTaskId(taskId).stream().map(WidgetDTO::getId).collect(Collectors.toList());
            widgetService.batchInsert(collect, relationMap);
        });

        try {
            dashboardMapper.save(dashboardTemplate);
        } catch (Exception e) {
            logger.error("项目复制过中，复制dashboard失败。");
            throw DataScienceException.of(BaseErrorCode.DASHBOARD_COPY_FAILED, e.getMessage());
        }
    }

    public DashboardDTO queryById(Long id) {
        return dashboardMapper.queryById(id);
    }

    public DashboardDTO queryByProjectId(Long projectId) {
        List<DashboardDTO> dashboardDTOS = dashboardMapper.queryByProjectId(projectId);
        if (dashboardDTOS.size() > 0) {
            return dashboardMapper.queryByProjectId(projectId).get(0);
        } else {
            return null;
        }
    }

    /**
     * 一个project 对应多个dashboard
     *
     * @param projectId
     * @return
     */
    public List<DashboardDTO> queryByProjectId2(Long projectId) {
        return dashboardMapper.queryByProjectId(projectId);
    }

    public DashboardDTO queryByPublishNo(String publishNo) {
        return dashboardMapper.queryByPublishNo(publishNo);
    }

    private Long initCorrespondingPipeline(DashboardDTO board, Long pipelineId) {
        //复制一个新的pipeline
        PipelineDTO originalPipeline = pipelineMapper.queryById(pipelineId);
        JSONObject originalDataJson = JSONObject.parseObject(originalPipeline.getDataJson());
        originalDataJson.put("published", Boolean.TRUE);
        originalPipeline.setDataJson(originalDataJson.toJSONString());
        originalPipeline.setId(null);
        originalPipeline.setProjectId(board.getProjectId());
        originalPipeline.setName(board.getName() + "发布pipeline（已锁定）");
        pipelineMapper.save(originalPipeline);
        Long newPipelineId = originalPipeline.getId();
        //复制原始pipeline中的内容
        List<TaskDTO> oldTaskDTOS = taskMapper.queryByPipeline(pipelineId);
        if (oldTaskDTOS.size() > 0) {
            oldTaskDTOS.forEach(e -> e.setPipelineId(newPipelineId));
            if (!taskService.batchProduce(oldTaskDTOS, pipelineId, true)) {
                throw DataScienceException
                        .of(BaseErrorCode.PROJECT_COPY_FAILED, null, "复制节点过程中失败。");
            }
        }
        JSONObject mappingJson = new JSONObject();
        Map<Long, Long> relationMap = taskUtil.getRelationShipByPipelineId(pipelineId);
        if (null != relationMap) {
            mappingJson.put(NEW_PIPELINE_ID, newPipelineId);
            mappingJson.put(RELATIONSHIP_MAP, MapUtil.reMap(relationMap, String.class));
            board.setMappingJson(mappingJson.toJSONString());
            board.setIsComplex(true);
        }
        dashboardMapper.update(board);
        taskUtil.clear(pipelineId, true);
        return newPipelineId;
    }

    @Transactional
    public String publish(DashboardVO vo) {
        DashboardDTO board = queryById(vo.getId());
        board.setShowWatermark(vo.getShowWatermark());
        if (null == vo.getPipelineId()) {
            vo.setPipelineId(pipelineMapper.getDefaultPipelineId(vo.getProjectId()));
        }
        if (board != null) {
            String publishNo = null;
            //前端指定 到底是不是复杂发布 (含有配置面板的就是复杂发布)
            if (null != board.getMappingJson() && !board.getMappingJson().isEmpty()) {
                //发布后，再发布
                unstuck(vo, true);
            }
            Long newPipelineId = initCorrespondingPipeline(board, vo.getPipelineId());

            String layout = board.getLayout();
            board.setStatus(1);
            board.setPublishLayout(layout);
            board.setPublishTime(LocalDateTime.now());
            publishNo = Hashing.md5().newHasher()
                    .putString(Joiner.on("|").join("PUBLISH_", board.getId()), Charsets.UTF_8)
                    .hash().toString();
            board.setPublishNo(publishNo);
            JSONObject layoutJson = JSONObject.parseObject(layout);
            JSONArray widgetJSONArr = layoutJson.getJSONArray("gridItems");
            for (int i = 0; i < widgetJSONArr.size(); i++) {
                JSONObject widgetJson = widgetJSONArr.getJSONObject(i);
                Long widgetId = widgetJson.getLong("widgetId");
                if (widgetId != null) {
                    widgetService.publish(widgetId);
                }
            }
            dashboardMapper.update(board);
            return publishNo;
        }

        return StringUtils.EMPTY;
    }


    /**
     * 在删除pipeline后，更新dashboard上的已经删除的图表
     *
     * @param widgetIds
     * @param projectId
     */
    public void updateAfterDelWidgets(List<Long> widgetIds, Long projectId) {
        if (dashboardMapper.queryByProjectId(projectId)==null
            || dashboardMapper.queryByProjectId(projectId).isEmpty()) {
            return;
        }
        DashboardDTO dashboardDTO = dashboardMapper.queryByProjectId(projectId).get(0);
        if (null == dashboardDTO || StringUtils.isEmpty(dashboardDTO.getLayout())) {
            return;
        }
        ExpandBitSet filter = new ExpandBitSet();
        widgetIds.forEach(widgetId -> filter.set(widgetId));
        JSONObject layout = JSONObject.parseObject(dashboardDTO.getLayout());
        JSONArray gridItems = layout.getJSONArray("gridItems");
        Iterator<Object> iterator = gridItems.iterator();
        while (iterator.hasNext()) {
            JSONObject next = (JSONObject) iterator.next();
            if (filter.get(next.getLong("widgetId"))) {
                iterator.remove();
            }
        }
        dashboardDTO.setLayout(layout.toJSONString());
        dashboardMapper.update(dashboardDTO);
    }

    public boolean containsWidget(List<WidgetDTO> widgets, Long projectId) {
        DashboardDTO dashboardDTO = queryByProjectId(projectId);
        if (null == dashboardDTO) {
            return false;
        }
        String layout = dashboardDTO.getLayout();
        if (StringUtils.isEmpty(layout)) {
            return false;
        }
        for (WidgetDTO widget : widgets) {
            if (layout.contains(String.format("\"widgetId\":%s", widget.getId()))) {
                return true;
            }
        }
        return false;
    }

    public void unstuck(DashboardVO vo) {
        unstuck(vo, false);
    }

    /**
     * 下架已部署的仪表盘
     * https://cf.zjvis.org/pages/viewpage.action?pageId=32642646
     *
     * @param vo
     */
    @Transactional
    public void unstuck(DashboardVO vo, boolean needDelete) {
        List<DashboardDTO> boards = Lists.newArrayList();
        DashboardDTO dto = dashboardMapper.queryById(vo.getId());
        if (null == dto) {
            //如果没有获得board， 查看是否有设置isComplex
            if (vo.getIsComplex() != null) {
                throw DataScienceException.of(BaseErrorCode.DASHBOARD_PARAM_NOT_VALID, "参数非法");
            }
            //没有设置id 和 isComplex 那就下架这个项目里的全部 仪表盘
            boards = dashboardMapper.queryByProjectId(vo.getProjectId());
        } else {
            boards.add(dto);
        }
        for (DashboardDTO board : boards) {
            if (needDelete && board.getIsComplex()) {
                //还需要删除之前创建的pipeline
                JSONObject jsonObject = JSONObject.parseObject(board.getMappingJson());
                if (null != jsonObject) {
                    Long createdPipelineId = jsonObject.getLong(NEW_PIPELINE_ID);
                    Map<String, Object> params = Maps.newHashMap();
                    params.put("ids", new Long[]{createdPipelineId});
                    pipelineMapper.delete(params);

                    //以及pipeline中的task节点
                    taskMapper.deleteByPipelineId(createdPipelineId);
                    taskInstanceMapper.deleteByPipelineId(createdPipelineId);
                    board.setMappingJson("");
                    board.setIsComplex(false);
                }
            }
            board.setStatus(0);
            dashboardMapper.update(board);
        }
    }

    /**
     * 检查发布的dashboard 是否需要触发执行
     * 且 只触发一次， 第一次发布的时候触发 之后就不再触发。
     *
     * @param dashboard
     */
    public Boolean triggerIfNeed(DashboardDTO dashboard) throws InterruptedException {
        if (!checkConfWidgets(dashboard.getPublishLayout())) {
            logger.info("dashboard {}, dont need to trigger, since there is no configuration widget.", dashboard.getId());
            return false;
        }
        JSONObject mappingJson = JSONObject.parseObject(dashboard.getMappingJson());
        Long newPipelineId = mappingJson.getLong("newPipelineId");
        //发布, 有需要执行的节点自动触发
        List<Long> candidateIds = taskMapper.queryByPipeline(newPipelineId).stream()
                .filter(taskDTO -> {
                    return taskDTO.getType().equals(TaskTypeEnum.TASK_TYPE_ALGO.getVal());
                })
                .map(taskDTO -> {
                    int algType = JSONObject.parseObject(taskDTO.getDataJson()).getIntValue("algType");
                    return new Pair<Long, Integer>(taskDTO.getId(), algType);
                }).filter(pair -> WidgetConfigEnum.isNeedAlgo(pair.getValue())).map(Pair::getKey).collect(Collectors.toList());

        if (candidateIds.size() > 0) {
            Long taskId = dagScheduler.findLCA(candidateIds);
            List<TaskInstanceDTO> instanceDTOList = taskInstanceMapper.queryByTaskId(taskId);
            if (instanceDTOList.size() > 0) {
                //如果有 执行记录，那就说明执行过，不再触发执行
                logger.info("dashboard {}, dont need to trigger, since it has executed before", dashboard.getId());
                return false;
            }
            logger.info("publish dashboard and execute task {} at pipeline {}", taskId, newPipelineId);
            Long sessionId = dagScheduler.triggerFullDose(newPipelineId, taskId, true);
            JSONObject params = new JSONObject();
            params.put("sessionId", sessionId);
            params.put("pipelineId", newPipelineId);
            socketIOService.sendToDashboard(NoticeConstant.PIPELINE_QUERY_STATUS, params);
            logger.info("start to monitor sessionId {}'s status at pipeline {}", sessionId, newPipelineId);
            return true;
        } else {
            logger.info("dashboard {}, dont need to trigger.", dashboard.getId());
            return false;
        }
    }

    /**
     * 检查是否有 configuration 类型的 widget
     * 如果没有，则不需要执行， 也不需要通知前端触发
     *
     * @param publishLayout
     * @return
     */
    private boolean checkConfWidgets(String publishLayout) {
        JSONObject jsonObject = JSONObject.parseObject(publishLayout);
        if (jsonObject.containsKey("gridItems")) {
            List<JSONObject> gridItems = jsonObject.getJSONArray("gridItems").toJavaList(JSONObject.class);
            for (JSONObject item : gridItems) {
                String chartType = item.getString("chartType");
                if (chartType.equals(WidgetTypeEnum.CONFIG.getDesc())) {
                    return true;
                }
            }
        }
        return false;
    }

    public List<WidgetDTO> returnAllWidgetInBoard(Long dashboardId) {
        return returnAllWidgetInBoard(dashboardId, false);
    }

    /**
     * 查看dashboard中有哪些griditems (widget)
     *
     * //这里 直接会过滤掉 数据表格类型的图表 因为此类图表没有widgetId
     *
     * @param dashboardId
     * @return
     */
    public List<WidgetDTO> returnAllWidgetInBoard(Long dashboardId, Boolean isPublished) {
        DashboardDTO dashboard = dashboardMapper.queryById(dashboardId);
        String layout = StringUtils.EMPTY;
        if (isPublished) {
            layout = dashboard.getPublishLayout();
        } else {
            layout = dashboard.getLayout();
        }
        JSONObject jsonObject = JSONObject.parseObject(layout);
        if (jsonObject.containsKey(GRID_ITEMS)) {
            List<JSONObject> gridItems = jsonObject.getJSONArray(GRID_ITEMS).toJavaList(JSONObject.class);
            List<JSONObject> tables = jsonObject.getJSONArray(TABLES).toJavaList(JSONObject.class);
            gridItems.addAll(tables);
            return gridItems.stream().filter(e -> e.containsKey(WIDGET_ID))
                    .map(item -> widgetService.queryById(item.getLong(WIDGET_ID)))
                    .collect(Collectors.toList());
        }
        return Lists.newArrayList();
    }

    /**
     * 返回当前画布使用的taskId 列表
     * @param dashboardId
     * @param isPublished
     * @return
     */
    public List<Long> returnAllUsedTaskInBoard(Long dashboardId, Boolean isPublished) {
        return returnAllWidgetInBoard(dashboardId, isPublished).stream().map(WidgetDTO::getTid).collect(Collectors.toList());
    }

}
